package org.idea.mq.redis.framework.mq.list;

import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.List;

/**
 * 消息触达回调
 * <p>
 * 弊端：需要通过实时轮训去获取信息（尝试使用阻塞模式来节省网络资源开销），性能不高，而且出现消费失败后没有重试机制
 * redis是在io处理方面是单线程模型，所以不会出现多个消费者获取同一条消息
 *
 * @Author linhao
 * @Date created in 3:20 下午 2022/2/7
 */
@Component
public class RedisListMQListener implements IMQListener {

    @Resource
    private IRedisService iRedisService;

    @Override
    public void onMessageReach(MsgWrapper msgWrapper) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                blockGet(msgWrapper);
            }
        });
        thread.start();
    }


    /**
     * 阻塞的方式获取数据
     */
    private void blockGet(MsgWrapper msgWrapper) {
        while (true) {
            List<String> values = iRedisService.brpop(msgWrapper.getTopic());
            if (!CollectionUtils.isEmpty(values)) {
                values.forEach(value -> {
                    System.out.println(value);
                });
            }
        }
    }

    /**
     * 轮询的方式获取数据
     *
     * @param msgWrapper
     */
    private void pollingGet(MsgWrapper msgWrapper) {
        while (true) {
            String value = iRedisService.rpop(msgWrapper.getTopic());
            if (!StringUtils.isEmpty(value)) {
                System.out.println(value);
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
